package com.ruc.dbiir.rest.datasource;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.ruc.dbiir.rest.utils.Config;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

/**
 * 从kafka消息队列读（消费）数据
 * author：mark   
 * createTime：May 26, 2018 1:27:41 PM   
 * @version
 */


//@Component(value = "readKafka")
public class ReadKafka2 {

	
	private final ConsumerConnector consumer;


	public ReadKafka2() {
		Properties props = new Properties();
		//zookeeper 配置
		props.put("zookeeper.connect", "202.112.113.71:2181");

		//group 代表一个消费组
		props.put("group.id", "jd-group");

		//zk连接超时
		props.put("zookeeper.session.timeout.ms", "4000000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		props.put("auto.offset.reset", "smallest");
		//序列化类
		props.put("serializer.class", "kafka.serializer.StringEncoder");

		ConsumerConfig config = new ConsumerConfig(props);

		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
	}


	public void consume() {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(Config.TOPIC_ERRORWATER, new Integer(1));

		StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
		StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

		Map<String, List<KafkaStream<String, String>>> consumerMap = 
				consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

	
		
		//消费errorWater数据
		KafkaStream<String, String> stream_errorWater = consumerMap.get(Config.TOPIC_ERRORWATER).get(0);
		ConsumerIterator<String, String> errorIt = stream_errorWater.iterator();
		while (errorIt.hasNext()) {
			String errorMsg = errorIt.next().message();
			System.out.println(errorMsg+"=====");
			Config.ERRORQUEUE.offer(errorMsg);
		}
		
		
		
	}
	//	public static void main(String[] args) {
	//		new ReadKafka().consume();
	//	}
}
